package com.tl.spark.scala

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.{ConnectionFactory, RegionLocator, Result, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, KeyValue, TableName}
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}

object BulkLoadHbase {
  def main(args: Array[String]): Unit = {
    //Hbase设置
    val conf = HBaseConfiguration.create()
    //设置zooKeeper集群地址，也可以通过将hbase-site.xml导入classpath，但是建议在程序里这样设置
    conf.set(HConstants.ZOOKEEPER_QUORUM, "db01,db02,db03")
    //设置zookeeper连接端口，默认2181
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    //设置读取的表
//        conf.set(TableInputFormat.INPUT_TABLE, "CN_FUL_DB")
//    //设置写入的表
//    conf.set(TableOutputFormat.OUTPUT_TABLE, "CN_FUL_DB")


    conf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 2048)
    conf.setInt("hbase.bulkload.retries.number",10)



    //创建sparkConf
    val sparkConf = new SparkConf()
    //设置spark的任务名
    sparkConf.setAppName("BulkLoad hbase ")
    //设置master地址
    sparkConf.setMaster("local[*]")
//    sparkConf.setMaster("spark://192.168.173.235:7077")
//      .setJars(Seq("F:\\IdeaProjects\\spark-test\\target\\spark-test-1.0-SNAPSHOT.jar"))
    //创建spark上下文
//    val sc = new SparkContext(sparkConf)

    //为job指定输出格式和输出表名
//    val job = Job.getInstance(conf)
//    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
//    job.setOutputValueClass(classOf[Result])
//    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    //hbase的表名
    val tableName = "CN_FUL_DB"
    //创建hbase的链接,利用默认的配置文件,实际上读取的hbase的master地址
    val conn = ConnectionFactory.createConnection(conf)
    //根据表名获取表
    val table2: Table = conn.getTable(TableName.valueOf(tableName))
    //获取hbase表的region分布
    val regionLocator: RegionLocator = conn.getRegionLocator(TableName.valueOf(tableName))
    //    val table2 = new HTable(conf,tableName)
//    HFileOutputFormat2.configureIncrementalLoadMap(job, table2.getDescriptor)

    val path = "hdfs://db03:9000/hbasebulkload/2018-12-04/parse_xml_20181204004947642"

    println(regionLocator.getAllRegionLocations)

    val bulkLoader = new LoadIncrementalHFiles(conf)
    try {

      bulkLoader.doBulkLoad(new Path(path), conn.getAdmin, table2, regionLocator)
    } finally {

    }



  }
}
